from twisted.internet import protocol
from twisted.internet import abstract
from twisted.internet import defer
-#defer.Deferred.debug = 1
from xen.lowlevel import xu
DEBUG = 1
-class MgmtProtocol(protocol.DatagramProtocol):
- """Handler for the management socket (unix-domain).
- """
-
- def __init__(self, daemon):
- #protocol.DatagramProtocol.__init__(self)
- self.daemon = daemon
-
- def write(self, data, addr):
- return self.transport.write(data, addr)
-
- def datagramReceived(self, data, addr):
- if DEBUG: print 'datagramReceived> addr=', addr, 'data=', data
- io = StringIO.StringIO(data)
- try:
- vals = sxp.parse(io)
- res = self.dispatch(vals[0])
- self.send_result(addr, res)
- except SystemExit:
- raise
- except:
- if DEBUG:
- raise
- else:
- self.send_error(addr)
-
- def send_reply(self, addr, sxpr):
- io = StringIO.StringIO()
- sxp.show(sxpr, out=io)
- io.seek(0)
- self.write(io.getvalue(), addr)
-
- def send_result(self, addr, res):
-
- def fn(res, self=self, addr=addr):
- self.send_reply(addr, ['ok', res])
-
- if isinstance(res, defer.Deferred):
- res.addCallback(fn)
- else:
- fn(res)
-
- def send_error(self, addr):
- (extype, exval) = sys.exc_info()[:2]
- self.send_reply(addr, ['err',
- ['type', str(extype) ],
- ['value', str(exval) ] ] )
-
- def opname(self, name):
- """Get the name of the method for an operation.
- """
- return 'op_' + name.replace('.', '_')
-
- def operror(self, name, v):
- """Default operation handler - signals an error.
- """
- raise NotImplementedError('Invalid operation: ' +name)
-
- def dispatch(self, req):
- """Dispatch a request to its handler.
- """
- op_name = sxp.name(req)
- op_method_name = self.opname(op_name)
- op_method = getattr(self, op_method_name, self.operror)
- return op_method(op_name, req)
-
- def op_console_create(self, name, req):
- """Create a new control interface - console for a domain.
- """
- print name, req
- dom = sxp.child_value(req, 'domain')
- if not dom: raise XendError('Missing domain')
- dom = int(dom)
- console_port = sxp.child_value(req, 'console_port')
- if console_port:
- console_port = int(console_port)
- resp = self.daemon.console_create(dom, console_port).sxpr()
- print name, resp
- return resp
-
- def op_consoles(self, name, req):
- """Get a list of the consoles.
- """
- return self.daemon.consoles()
-
- def op_console_disconnect(self, name, req):
- id = sxp.child_value(req, 'id')
- if not id:
- raise XendError('Missing console id')
- id = int(id)
- console = self.daemon.get_console(id)
- if not console:
- raise XendError('Invalid console id')
- if console.conn:
- console.conn.loseConnection()
- return ['ok']
-
- def op_blkifs(self, name, req):
- pass
-
- def op_blkif_devs(self, name, req):
- pass
-
- def op_blkif_create(self, name, req):
- pass
-
- def op_blkif_dev_create(self, name, req):
- pass
-
- def op_netifs(self, name, req):
- pass
-
- def op_netif_devs(self, name, req):
- pass
-
- def op_netif_create(self, name, req):
- pass
-
- def op_netif_dev_create(self, name, req):
- pass
-
class NotifierProtocol(protocol.Protocol):
"""Asynchronous handler for i/o on the notifier (event channel).
"""
del self.d
def doRead(self):
- #print 'NotifierPort>doRead>', self
count = 0
while 1:
- #print 'NotifierPort>doRead>', count
notification = self.notifier.read()
if not notification:
break
self.protocol.notificationReceived(notification)
self.notifier.unmask(notification)
count += 1
- #print 'NotifierPort>doRead<'
class EventProtocol(protocol.Protocol):
"""Asynchronous handler for a connected event socket.
err = 1
print "Daemon already running: ", pids
return err
-
- def cleanup(self, kill=False):
- # No cleanup to do if PID_FILE is empty.
- if not os.path.isfile(PID_FILE) or not os.path.getsize(PID_FILE):
- return 0
- # Read the pid of the previous invocation and search active process list.
- pid = open(PID_FILE, 'r').read()
- lines = os.popen('ps ' + pid + ' 2>/dev/null').readlines()
- for line in lines:
- if re.search('^ *' + pid + '.+xend', line):
- if not kill:
- print "Daemon is already running (pid %d)" % int(pid)
- return 1
- # Old daemon is still active: terminate it.
- os.kill(int(pid), 1)
- # Delete the stale PID_FILE.
- os.remove(PID_FILE)
- return 0
+ def read_pid(self, pidfile):
+ """Read process id from a file.
+
+ @param pidfile: file to read
+ @return pid or 0
+ """
+ pid = 0
+ if os.path.isfile(pidfile) and os.path.getsize(pidfile):
+ try:
+ pid = open(pidfile, 'r').read()
+ pid = int(pid)
+ except:
+ pid = 0
+ return pid
+
+ def find_process(self, pid, name):
+ """Search for a process.
+
+ @param pid: process id
+ @param name: process name
+ @return: pid if found, 0 otherwise
+ """
+ running = 0
+ if pid:
+ lines = os.popen('ps %d 2>/dev/null' % pid).readlines()
+ exp = '^ *%d.+%s' % (pid, name)
+ for line in lines:
+ if re.search(exp, line):
+ running = pid
+ break
+ return running
+
+ def cleanup_process(self, pidfile, name, kill):
+ """Clean up the pidfile for a process.
+ If a running process is found, kills it if 'kill' is true.
+
+ @param pidfile: pid file
+ @param name: process name
+ @param kill: whether to kill the process
+ @return running process id or 0
+ """
+ running = 0
+ pid = self.read_pid(pidfile)
+ if self.find_process(pid, name):
+ if kill:
+ os.kill(pid, 1)
+ else:
+ running = pid
+ if running == 0 and os.path.isfile(pidfile):
+ os.remove(pidfile)
+ return running
+
+ def cleanup_xend(self, kill=False):
+ return self.cleanup_process(XEND_PID_FILE, "xend", kill)
+
+ def cleanup_xfrd(self, kill=False):
+ return self.cleanup_process(XFRD_PID_FILE, "xfrd", kill)
+
+ def cleanup(self, kill=False):
+ self.cleanup_xend(kill=kill)
+ self.cleanup_xfrd(kill=kill)
+
def install_child_reaper(self):
#signal.signal(signal.SIGCHLD, self.onSIGCHLD)
# Ensure that zombie children are automatically reaped.
while code > 0:
code = os.waitpid(-1, os.WNOHANG)
+ def fork_pid(self, pidfile):
+ """Fork and write the pid of the child to 'pidfile'.
+
+ @param pidfile: pid file
+ @return: pid of child in parent, 0 in child
+ """
+ pid = os.fork()
+ if pid:
+ # Parent
+ pidfile = open(pidfile, 'w')
+ pidfile.write(str(pid))
+ pidfile.close()
+ return pid
+
+ def start_xfrd(self):
+ """Fork and exec xfrd, writing its pid to XFRD_PID_FILE.
+ """
+ if self.fork_pid(XFRD_PID_FILE):
+ # Parent
+ pass
+ else:
+ # Child
+ self.set_user()
+ os.execl("/usr/sbin/xfrd", "xfrd")
+
def start(self, trace=0):
- if self.cleanup(kill=False):
+ xend_pid = self.cleanup_xend()
+ xfrd_pid = self.cleanup_xfrd()
+ if xfrd_pid == 0:
+ self.start_xfrd()
+ if xend_pid > 0:
return 1
# Detach from TTY.
if not DEBUG:
os.setsid()
-
if self.set_user():
return 1
-
self.install_child_reaper()
- # Fork -- parent writes PID_FILE and exits.
- pid = os.fork()
- if pid:
- # Parent
- pidfile = open(PID_FILE, 'w')
- pidfile.write(str(pid))
- pidfile.close()
- return 0
- # Child
- self.tracing(trace)
- self.run()
+ if self.fork_pid(XEND_PID_FILE):
+ #Parent
+ pass
+ else:
+ # Child
+ self.tracing(trace)
+ self.run()
return 0
def tracing(self, traceon):
"""Turn tracing on or off.
- traceon tracing flag
+ @param traceon: tracing flag
"""
if traceon == self.traceon:
return
self.traceon = traceon
if traceon:
- self.tracefile = open('/var/log/xend.trace', 'w+', 1)
+ self.tracefile = open(XEND_TRACE_FILE, 'w+', 1)
self.traceindent = 0
sys.settrace(self.trace)
try:
xroot = XendRoot.instance()
log.info("Xend Daemon started")
self.createFactories()
- self.listenMgmt()
self.listenEvent()
self.listenNotifier()
self.listenVirq()
self.netifCF = netif.NetifControllerFactory()
self.consoleCF = console.ConsoleControllerFactory()
- def listenMgmt(self):
- protocol = MgmtProtocol(self)
- s = os.path.join(CONTROL_DIR, MGMT_SOCK)
- if os.path.exists(s):
- os.unlink(s)
- return reactor.listenUNIXDatagram(s, protocol)
-
def listenEvent(self):
protocol = EventFactory(self)
return reactor.listenTCP(EVENT_PORT, protocol)
virqChan.registerClient(VirqClient(self))
def exit(self):
- reactor.diconnectAll()
+ reactor.disconnectAll()
sys.exit(0)
def getDomChannel(self, dom):